From 0c7176dd0b092fe49c6c7fc815d449f7c618dcd7 Mon Sep 17 00:00:00 2001 From: ChemaVX Date: Mon, 27 Apr 2026 20:37:39 +0000 Subject: [PATCH] fix: add /process command, log quality filtering, improve Reddit headers - bot.py: add cmd_process handler to manually trigger chunk processing on the last session; register CommandHandler("process") - processor.py: log exceptions from asyncio.gather instead of silently dropping them; add per-chunk quality score debug logging; warn when all chunks filtered by quality threshold with actionable hint; raise fallback score to 0.6 so Ollama failures don't filter chunks - exhaustive.py: replace bot User-Agent with full browser UA + headers for REDDIT_HEADERS; downgrade Reddit 403 from warning to info since server IPs are routinely blocked; use content_type=None on json() to avoid aiohttp content-type mismatch errors Co-Authored-By: Claude Sonnet 4.6 --- src/bot/bot.py | 60 ++++++++++++++++++++++++++++++++++++++ src/processor/processor.py | 44 ++++++++++++++++++++++------ src/scraper/exhaustive.py | 34 ++++++++++++++------- 3 files changed, 119 insertions(+), 19 deletions(-) diff --git a/src/bot/bot.py b/src/bot/bot.py index 56f8396..79f4a2f 100644 --- a/src/bot/bot.py +++ b/src/bot/bot.py @@ -66,6 +66,7 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): "`/research ` — Start exhaustive research\n" "`/status` — Check current research progress\n" "`/finish` — Stop research and proceed to generation\n" + "`/process` — Manually trigger chunk processing\n" "`/generate ` — Generate output (podcast|blog|report|thread)\n" "`/sources` — List all sources found\n" "`/outputs` — List generated outputs\n" @@ -426,6 +427,64 @@ async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await db_conn.close() +async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + db_conn = await get_db() + db = ResearchDB(db_conn) + + try: + cursor = await db_conn.execute( + "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", + (chat_id,) + ) + row = await cursor.fetchone() + if not row: + await update.message.reply_text("No research sessions found. Start with /research ") + return + + session = dict(row) + session_id = session["id"] + topic = session["topic"] + + msg = await update.message.reply_text( + f"🧠 Processing session #{session_id}: `{topic}`\n" + f"Chunking & scoring with Ollama ({settings.ollama_model})...\n" + f"This may take a few minutes.", + parse_mode=ParseMode.MARKDOWN + ) + + ollama = OllamaClient() + if not await ollama.is_available(): + await msg.edit_text("❌ Ollama not reachable. Check OLLAMA_URL setting.") + return + + processor = ContentProcessor(db, ollama) + + async def proc_progress(total_chunks, total_words): + try: + await msg.edit_text( + f"🧠 *Processing complete!*\n" + f"• Chunks stored: `{total_chunks}`\n" + f"• Words researched: `{total_words:,}`\n\n" + f"Ready! Use `/generate podcast|blog|report|thread`\n" + f"_If 0 chunks: set `QUALITY_THRESHOLD=0.3` or `0` and retry_", + parse_mode=ParseMode.MARKDOWN + ) + except Exception: + pass + + await processor.process_session(session_id, topic, proc_progress) + + except Exception as e: + logger.error("Process command failed", error=str(e)) + await update.message.reply_text(f"❌ Processing failed: {str(e)[:200]}") + finally: + await db_conn.close() + + async def cmd_cancel(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return @@ -456,6 +515,7 @@ def create_bot() -> Application: app.add_handler(CommandHandler("generate", cmd_generate)) app.add_handler(CommandHandler("sources", cmd_sources)) app.add_handler(CommandHandler("outputs", cmd_outputs)) + app.add_handler(CommandHandler("process", cmd_process)) app.add_handler(CommandHandler("cancel", cmd_cancel)) return app diff --git a/src/processor/processor.py b/src/processor/processor.py index 0fda083..b84cde8 100644 --- a/src/processor/processor.py +++ b/src/processor/processor.py @@ -137,8 +137,11 @@ class ContentProcessor: results = await asyncio.gather(*[process_one(s) for s in scraped], return_exceptions=True) - for r in results: - if isinstance(r, int): + for i, r in enumerate(results): + if isinstance(r, Exception): + logger.error("Source processing raised exception", + source_id=scraped[i]["id"], error=str(r), exc_info=r) + elif isinstance(r, int): total_chunks += r total_words = sum(s.get("word_count", 0) for s in scraped) @@ -159,17 +162,27 @@ class ContentProcessor: content = await self.db.get_source_content(source_id) if not content: + logger.warning("No content in source_contents", source_id=source_id) return 0 chunks = simple_chunk(content, settings.chunk_size, settings.chunk_overlap) + logger.info("Processing source", source_id=source_id, + content_len=len(content), num_chunks=len(chunks), + quality_threshold=settings.quality_threshold) stored = 0 + filtered_quality = 0 for i, chunk in enumerate(chunks): - if len(chunk.split()) < 30: + words = len(chunk.split()) + if words < 30: continue quality = await self._score_quality(chunk, topic) if quality < settings.quality_threshold: + filtered_quality += 1 + logger.debug("Chunk filtered by quality", source_id=source_id, + chunk_index=i, quality=round(quality, 2), + threshold=settings.quality_threshold, words=words) continue embedding = await self.ollama.embed(chunk[:1000]) @@ -179,12 +192,22 @@ class ContentProcessor: source_id=source_id, content=chunk, chunk_index=i, - token_count=len(chunk.split()), + token_count=words, quality_score=quality, embedding=embedding ) stored += 1 + if filtered_quality > 0 and stored == 0: + logger.warning( + "All chunks filtered by quality — consider lowering QUALITY_THRESHOLD " + "(currently %.1f) or set QUALITY_THRESHOLD=0 to disable", + settings.quality_threshold, + source_id=source_id, chunks_total=len(chunks), + chunks_filtered=filtered_quality + ) + + logger.info("Source processed", source_id=source_id, stored=stored) return stored async def _score_quality(self, chunk: str, topic: str) -> float: @@ -204,14 +227,17 @@ Respond with ONLY a single number 0-10. No explanation.""" try: response = await self.ollama.generate(prompt) - # Extract number from response numbers = re.findall(r'\b(\d+(?:\.\d+)?)\b', response) if numbers: score = float(numbers[0]) - return min(1.0, score / 10.0) - return 0.5 - except Exception: - return 0.5 # default on error + normalized = min(1.0, score / 10.0) + logger.debug("Quality score", raw=score, normalized=round(normalized, 2)) + return normalized + logger.debug("No number in quality response", response=response[:80]) + return 0.6 # above threshold so chunk is kept + except Exception as e: + logger.warning("Quality scoring failed", error=str(e)) + return 0.6 # above threshold so chunk is kept on Ollama error async def rag_query(self, session_id: int, query: str, top_k: int = 20) -> str: """ diff --git a/src/scraper/exhaustive.py b/src/scraper/exhaustive.py index c849e5e..f07b5cf 100644 --- a/src/scraper/exhaustive.py +++ b/src/scraper/exhaustive.py @@ -23,9 +23,21 @@ from src.db.database import ResearchDB logger = structlog.get_logger() HEADERS = { - "User-Agent": "Mozilla/5.0 (compatible; ResearchOwl/1.0; +https://chemavx.xyz)", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,es;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "DNT": "1", +} + +# Reddit requires its own headers — generic bots get 403 +REDDIT_HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + "Accept": "application/json, text/javascript, */*; q=0.01", + "Accept-Language": "en-US,en;q=0.9", + "Accept-Encoding": "gzip, deflate, br", + "Referer": "https://www.reddit.com/", + "X-Requested-With": "XMLHttpRequest", } # Domains to skip — not useful for research @@ -177,10 +189,10 @@ class ExhaustiveScraper: """Search Reddit — sequential to avoid rate limiting""" try: http = await self._get_http() - url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=15" - async with http.get(url, headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"}) as resp: + url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=15&type=link" + async with http.get(url, headers=REDDIT_HEADERS) as resp: if resp.status == 200: - data = await resp.json() + data = await resp.json(content_type=None) posts = data.get("data", {}).get("children", []) for post in posts: post_data = post.get("data", {}) @@ -192,6 +204,8 @@ class ExhaustiveScraper: title=post_data.get("title") ) logger.info("Reddit seed", found=len(posts), status=resp.status) + elif resp.status == 403: + logger.info("Reddit seed blocked (403) — server IP likely blocked by Reddit; skipping") else: logger.warning("Reddit seed non-200", status=resp.status) except Exception as e: @@ -446,14 +460,14 @@ class ExhaustiveScraper: json_url = url.rstrip("/") + ".json?limit=100&sort=top" http = await self._get_http() try: - async with http.get( - json_url, - headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"} - ) as resp: + async with http.get(json_url, headers=REDDIT_HEADERS) as resp: + if resp.status == 403: + logger.info("Reddit post blocked (403) — skipping", url=url[:60]) + return None, None if resp.status != 200: logger.debug("Reddit non-200", status=resp.status, url=url[:60]) return None, None - data = await resp.json() + data = await resp.json(content_type=None) post = data[0]["data"]["children"][0]["data"] title = post.get("title", "")