feat: dedup semántico antes del scoring — hash MD5 + similitud Jaccard
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
This commit is contained in:
@@ -129,6 +129,8 @@ class ContentProcessor:
|
|||||||
scraped = [s for s in sources if s["status"] == "scraped"]
|
scraped = [s for s in sources if s["status"] == "scraped"]
|
||||||
|
|
||||||
logger.info("Processing sources", total=len(scraped))
|
logger.info("Processing sources", total=len(scraped))
|
||||||
|
scraped = await self._dedup_sources(session_id, scraped)
|
||||||
|
logger.info("After dedup", unique=len(scraped))
|
||||||
total_chunks = 0
|
total_chunks = 0
|
||||||
total_words = 0
|
total_words = 0
|
||||||
|
|
||||||
@@ -161,6 +163,56 @@ class ContentProcessor:
|
|||||||
|
|
||||||
return {"total_chunks": total_chunks, "total_words": total_words}
|
return {"total_chunks": total_chunks, "total_words": total_words}
|
||||||
|
|
||||||
|
async def _dedup_sources(self, session_id: int,
|
||||||
|
scraped: list[dict]) -> list[dict]:
|
||||||
|
try:
|
||||||
|
import hashlib
|
||||||
|
seen_hashes: set = set()
|
||||||
|
seen_prefixes: list = []
|
||||||
|
unique: list = []
|
||||||
|
duplicates = 0
|
||||||
|
|
||||||
|
for source in scraped:
|
||||||
|
content = await self.db.get_source_content(source["id"])
|
||||||
|
if not content:
|
||||||
|
unique.append(source)
|
||||||
|
continue
|
||||||
|
|
||||||
|
content_hash = hashlib.md5(content[:2000].encode()).hexdigest()
|
||||||
|
if content_hash in seen_hashes:
|
||||||
|
duplicates += 1
|
||||||
|
await self.db.update_source(source["id"], status="skipped")
|
||||||
|
continue
|
||||||
|
seen_hashes.add(content_hash)
|
||||||
|
|
||||||
|
prefix = content[:300].strip().lower()
|
||||||
|
prefix_words = set(prefix.split())
|
||||||
|
is_dup = False
|
||||||
|
if len(prefix_words) >= 10:
|
||||||
|
for seen_prefix_words in seen_prefixes:
|
||||||
|
intersection = len(prefix_words & seen_prefix_words)
|
||||||
|
union = len(prefix_words | seen_prefix_words)
|
||||||
|
if intersection / max(union, 1) > 0.85:
|
||||||
|
is_dup = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if is_dup:
|
||||||
|
duplicates += 1
|
||||||
|
await self.db.update_source(source["id"], status="skipped")
|
||||||
|
continue
|
||||||
|
|
||||||
|
seen_prefixes.append(prefix_words)
|
||||||
|
unique.append(source)
|
||||||
|
|
||||||
|
if duplicates > 0:
|
||||||
|
logger.info("Dedup complete", session_id=session_id,
|
||||||
|
original=len(scraped), duplicates=duplicates,
|
||||||
|
unique=len(unique))
|
||||||
|
return unique
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Dedup failed, processing all sources", error=str(e))
|
||||||
|
return scraped
|
||||||
|
|
||||||
async def _process_source(self, session_id: int, topic: str, source: dict) -> int:
|
async def _process_source(self, session_id: int, topic: str, source: dict) -> int:
|
||||||
"""Chunk, score, embed and store a single source. Returns chunk count."""
|
"""Chunk, score, embed and store a single source. Returns chunk count."""
|
||||||
source_id = source["id"]
|
source_id = source["id"]
|
||||||
|
|||||||
Reference in New Issue
Block a user