diff --git a/src/processor/processor.py b/src/processor/processor.py index 7cb9125..9b3db7f 100644 --- a/src/processor/processor.py +++ b/src/processor/processor.py @@ -129,6 +129,8 @@ class ContentProcessor: scraped = [s for s in sources if s["status"] == "scraped"] logger.info("Processing sources", total=len(scraped)) + scraped = await self._dedup_sources(session_id, scraped) + logger.info("After dedup", unique=len(scraped)) total_chunks = 0 total_words = 0 @@ -161,6 +163,56 @@ class ContentProcessor: return {"total_chunks": total_chunks, "total_words": total_words} + async def _dedup_sources(self, session_id: int, + scraped: list[dict]) -> list[dict]: + try: + import hashlib + seen_hashes: set = set() + seen_prefixes: list = [] + unique: list = [] + duplicates = 0 + + for source in scraped: + content = await self.db.get_source_content(source["id"]) + if not content: + unique.append(source) + continue + + content_hash = hashlib.md5(content[:2000].encode()).hexdigest() + if content_hash in seen_hashes: + duplicates += 1 + await self.db.update_source(source["id"], status="skipped") + continue + seen_hashes.add(content_hash) + + prefix = content[:300].strip().lower() + prefix_words = set(prefix.split()) + is_dup = False + if len(prefix_words) >= 10: + for seen_prefix_words in seen_prefixes: + intersection = len(prefix_words & seen_prefix_words) + union = len(prefix_words | seen_prefix_words) + if intersection / max(union, 1) > 0.85: + is_dup = True + break + + if is_dup: + duplicates += 1 + await self.db.update_source(source["id"], status="skipped") + continue + + seen_prefixes.append(prefix_words) + unique.append(source) + + if duplicates > 0: + logger.info("Dedup complete", session_id=session_id, + original=len(scraped), duplicates=duplicates, + unique=len(unique)) + return unique + except Exception as e: + logger.warning("Dedup failed, processing all sources", error=str(e)) + return scraped + async def _process_source(self, session_id: int, topic: str, source: dict) -> int: """Chunk, score, embed and store a single source. Returns chunk count.""" source_id = source["id"]