feat: retry+backoff en scraper, ProgressReporter en bot
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
This commit is contained in:
+42
-52
@@ -33,17 +33,24 @@ def is_authorized(user_id: int) -> bool:
|
|||||||
return not allowed or user_id in allowed
|
return not allowed or user_id in allowed
|
||||||
|
|
||||||
|
|
||||||
def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str:
|
class ProgressReporter:
|
||||||
scraped = stats.get("scraped") or 0
|
def __init__(self, reply_target: Message):
|
||||||
failed = stats.get("failed") or 0
|
self._reply_target = reply_target
|
||||||
pending = stats.get("pending") or 0
|
self._msg: Optional[Message] = None
|
||||||
skipped = stats.get("skipped") or 0
|
|
||||||
return (
|
async def start(self, text: str):
|
||||||
f"🔄 *Iteration {iteration}*\n"
|
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||||
f"📚 Sources found: `{total}`\n"
|
|
||||||
f"✅ Scraped: `{scraped}` | ⏭️ Skipped: `{skipped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n"
|
async def update(self, text: str):
|
||||||
f"🆕 New URLs this round: `{new}`"
|
if not self._msg:
|
||||||
)
|
return
|
||||||
|
try:
|
||||||
|
await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def done(self, text: str):
|
||||||
|
await self.update(text)
|
||||||
|
|
||||||
|
|
||||||
async def send_chunked(message: Message, text: str, parse_mode=None):
|
async def send_chunked(message: Message, text: str, parse_mode=None):
|
||||||
@@ -91,77 +98,54 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check for existing active research
|
|
||||||
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||||
await update.message.reply_text(
|
await update.message.reply_text(
|
||||||
"⚠️ Research already in progress. Use /status or /finish first."
|
"⚠️ Research already in progress. Use /status or /finish first."
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
msg = await update.message.reply_text(
|
|
||||||
f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n"
|
|
||||||
f"🌱 Seeding sources from:\n"
|
|
||||||
f"• DuckDuckGo (8 queries)\n"
|
|
||||||
f"• Wikipedia + internal links\n"
|
|
||||||
f"• Reddit top posts\n"
|
|
||||||
f"• YouTube transcripts\n\n"
|
|
||||||
f"This will run exhaustively until saturation. Use /finish to stop early.",
|
|
||||||
parse_mode=ParseMode.MARKDOWN
|
|
||||||
)
|
|
||||||
|
|
||||||
async def run_research():
|
async def run_research():
|
||||||
db_conn = await get_db()
|
db_conn = await get_db()
|
||||||
db = ResearchDB(db_conn)
|
db = ResearchDB(db_conn)
|
||||||
|
reporter = None
|
||||||
try:
|
try:
|
||||||
session_id = await db.create_session(topic, chat_id)
|
session_id = await db.create_session(topic, chat_id)
|
||||||
_active_sessions[chat_id] = session_id
|
_active_sessions[chat_id] = session_id
|
||||||
|
|
||||||
progress_msg = msg
|
reporter = ProgressReporter(update.message)
|
||||||
iteration_count = [0]
|
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
|
||||||
|
|
||||||
async def on_progress(iteration, total, new_this_round, stats):
|
async def on_progress(iter_num, total_sources):
|
||||||
iteration_count[0] = iteration
|
await reporter.update(
|
||||||
text = fmt_progress(iteration, total, new_this_round, stats)
|
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
|
||||||
try:
|
)
|
||||||
await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
||||||
final_stats = await scraper.run()
|
final_stats = await scraper.run()
|
||||||
|
|
||||||
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
||||||
|
|
||||||
scraped = final_stats.get("scraped", 0)
|
scraped = final_stats.get("scraped", 0)
|
||||||
await update.message.reply_text(
|
|
||||||
f"✅ *Research complete!*\n\n"
|
|
||||||
f"📊 Results:\n"
|
|
||||||
f"• Sources found & scraped: `{scraped}`\n"
|
|
||||||
f"• Iterations: `{iteration_count[0]}`\n\n"
|
|
||||||
f"Now processing content with Ollama...\n"
|
|
||||||
f"Use `/generate podcast|blog|report|thread` when ready.",
|
|
||||||
parse_mode=ParseMode.MARKDOWN
|
|
||||||
)
|
|
||||||
|
|
||||||
# Auto-process after scraping
|
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
|
||||||
|
|
||||||
ollama = OllamaClient()
|
ollama = OllamaClient()
|
||||||
if await ollama.is_available():
|
if await ollama.is_available():
|
||||||
processor = ContentProcessor(db, ollama)
|
processor = ContentProcessor(db, ollama)
|
||||||
|
|
||||||
async def proc_progress(total_chunks, total_words):
|
async def proc_progress(total_chunks, total_words):
|
||||||
await update.message.reply_text(
|
await reporter.update(
|
||||||
f"🧠 *Processing complete!*\n"
|
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
|
||||||
f"• Chunks stored: `{total_chunks}`\n"
|
|
||||||
f"• Words researched: `{total_words:,}`\n\n"
|
|
||||||
f"Ready! Use `/generate podcast|blog|report|thread`",
|
|
||||||
parse_mode=ParseMode.MARKDOWN
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await processor.process_session(session_id, topic, proc_progress)
|
await processor.process_session(session_id, topic, proc_progress)
|
||||||
|
chunk_count = await db.get_chunks_count(session_id)
|
||||||
|
await reporter.done(
|
||||||
|
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
await update.message.reply_text(
|
await reporter.done(
|
||||||
"⚠️ Ollama not reachable — skipping processing.\n"
|
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
|
||||||
"You can still use `/generate` (will use raw content)."
|
f"Usa /generate para generar contenido."
|
||||||
)
|
)
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
@@ -169,9 +153,15 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
_active_sessions.get(chat_id, 0),
|
_active_sessions.get(chat_id, 0),
|
||||||
status=ResearchStatus.FINISHED
|
status=ResearchStatus.FINISHED
|
||||||
)
|
)
|
||||||
|
if reporter:
|
||||||
|
await reporter.done("🛑 Investigación cancelada.")
|
||||||
|
else:
|
||||||
await update.message.reply_text("🛑 Research cancelled.")
|
await update.message.reply_text("🛑 Research cancelled.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Research task failed", error=str(e))
|
logger.error("Research task failed", error=str(e))
|
||||||
|
if reporter:
|
||||||
|
await reporter.done(f"❌ Error: {str(e)[:200]}")
|
||||||
|
else:
|
||||||
await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}")
|
await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}")
|
||||||
finally:
|
finally:
|
||||||
await db_conn.close()
|
await db_conn.close()
|
||||||
|
|||||||
+33
-12
@@ -3,6 +3,7 @@ ResearchOwl Exhaustive Scraper
|
|||||||
Core engine: discovers, expands, and evaluates sources recursively
|
Core engine: discovers, expands, and evaluates sources recursively
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
@@ -54,6 +55,22 @@ REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)")
|
|||||||
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
|
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_with_retry(fetch_fn, source_name: str, max_retries: int = 3):
|
||||||
|
last_exc = None
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
return await fetch_fn()
|
||||||
|
except Exception as e:
|
||||||
|
last_exc = e
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
wait = 2 ** attempt + random.random()
|
||||||
|
logger.debug("fetch_with_retry backoff", source=source_name[:60],
|
||||||
|
attempt=attempt + 1, wait=round(wait, 1), error=str(e))
|
||||||
|
await asyncio.sleep(wait)
|
||||||
|
logger.warning("fetch_with_retry exhausted", source=source_name[:60], error=str(last_exc))
|
||||||
|
raise last_exc
|
||||||
|
|
||||||
|
|
||||||
def detect_source_type(url: str) -> str:
|
def detect_source_type(url: str) -> str:
|
||||||
if YOUTUBE_RE.search(url):
|
if YOUTUBE_RE.search(url):
|
||||||
return "youtube"
|
return "youtube"
|
||||||
@@ -290,12 +307,7 @@ class ExhaustiveScraper:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if self.progress_callback:
|
if self.progress_callback:
|
||||||
await self.progress_callback(
|
await self.progress_callback(self.iteration, self.total_sources)
|
||||||
iteration=self.iteration,
|
|
||||||
total=self.total_sources,
|
|
||||||
new_this_round=new_sources,
|
|
||||||
stats=stats
|
|
||||||
)
|
|
||||||
|
|
||||||
# Saturation check: if we found very few new URLs, we're done
|
# Saturation check: if we found very few new URLs, we're done
|
||||||
if new_sources < 3 and self.iteration > 2:
|
if new_sources < 3 and self.iteration > 2:
|
||||||
@@ -317,9 +329,13 @@ class ExhaustiveScraper:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
if source_type == "youtube":
|
if source_type == "youtube":
|
||||||
content, title = await self._extract_youtube(url)
|
content, title = await fetch_with_retry(
|
||||||
|
lambda: self._extract_youtube(url), url
|
||||||
|
)
|
||||||
elif source_type == "wikipedia":
|
elif source_type == "wikipedia":
|
||||||
content, title, new_urls = await self._extract_wikipedia(url)
|
content, title, new_urls = await fetch_with_retry(
|
||||||
|
lambda: self._extract_wikipedia(url), url
|
||||||
|
)
|
||||||
added = 0
|
added = 0
|
||||||
for new_url in (new_urls or []):
|
for new_url in (new_urls or []):
|
||||||
if self._url_is_relevant(new_url):
|
if self._url_is_relevant(new_url):
|
||||||
@@ -331,13 +347,18 @@ class ExhaustiveScraper:
|
|||||||
await self._mark_scraped(source_id, content, title, url)
|
await self._mark_scraped(source_id, content, title, url)
|
||||||
return added
|
return added
|
||||||
elif source_type == "reddit":
|
elif source_type == "reddit":
|
||||||
content, title = await self._extract_reddit(url)
|
content, title = await fetch_with_retry(
|
||||||
# Small delay between Reddit requests to avoid rate limiting
|
lambda: self._extract_reddit(url), url
|
||||||
|
)
|
||||||
await asyncio.sleep(settings.request_delay)
|
await asyncio.sleep(settings.request_delay)
|
||||||
elif source_type == "pdf":
|
elif source_type == "pdf":
|
||||||
content, title = await self._extract_pdf(url)
|
content, title = await fetch_with_retry(
|
||||||
|
lambda: self._extract_pdf(url), url
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
content, title, new_urls = await self._extract_web(url, source["depth"])
|
content, title, new_urls = await fetch_with_retry(
|
||||||
|
lambda: self._extract_web(url, source["depth"]), url
|
||||||
|
)
|
||||||
added = 0
|
added = 0
|
||||||
for new_url in (new_urls or []):
|
for new_url in (new_urls or []):
|
||||||
if self._url_is_relevant(new_url):
|
if self._url_is_relevant(new_url):
|
||||||
|
|||||||
Reference in New Issue
Block a user