""" ResearchOwl Generators Produces structured outputs from processed research using Claude or Ollama """ import base64 import hashlib import hmac import json import re import time import structlog from src.config import settings from src.processor.processor import OllamaClient, ContentProcessor from src.db.database import ResearchDB, OutputType logger = structlog.get_logger() PODCAST_SYSTEM = ( "Escribe SIEMPRE en español. " "Eres un guionista de podcast. Escribe exactamente como un presentador HABLA — contracciones, " "frases naturales, pausas, preguntas retóricas. " "NUNCA repitas una frase o idea que ya escribiste. " "Cada párrafo debe introducir información NUEVA. " "Usa marcadores [PAUSA], [ÉNFASIS], [MÚSICA] con moderación." ) BLOG_SYSTEM = ( "Escribe SIEMPRE en español. " "Eres un periodista escribiendo un artículo de blog. Usa encabezados markdown claros. " "NUNCA repitas el mismo dato o frase dos veces — si ya lo dijiste, avanza. " "Cada sección debe añadir información nueva no cubierta en secciones anteriores." ) BLOG_SYSTEM_EN = ( "You write ALWAYS in English. " "You are a journalist writing a blog article. Use clear markdown headers. " "NEVER repeat the same fact or phrase twice — if you said it, move on. " "Each section must add new information not covered in other sections." ) BLOG_PROMPT_EN = """\ Write a blog article about: "{topic}" RULES — follow strictly: - Each section under a heading must add NEW information not covered elsewhere - Do NOT summarize previous sections at the start of each new section - Do NOT repeat facts — if a fact appears once, do not mention it again - Use concrete details, numbers, names — avoid vague generalities - Target: 1000-1500 words STRUCTURE: # [Impactful headline] [Hook paragraph — the most surprising fact] ## Background [Context — what, when, who — only facts not covered elsewhere] ## Key Facts [Most significant findings — each point must be distinct] ## Analysis / Significance [What this means — without repeating the Key Facts section] ## Conclusion [No more than 2 sentences summarizing, then a forward-looking statement] RESEARCH MATERIAL: {context} Write the complete article in markdown:""" REPORT_SYSTEM = ( "Escribe SIEMPRE en español. " "Eres un analista de investigación. Escribe un informe estructurado y factual. " "Sé conciso — NO rellenes con resúmenes redundantes. " "NUNCA repitas un hallazgo ya listado. Cada hallazgo numerado debe ser distinto." ) THREAD_SYSTEM = ( "Escribe SIEMPRE en español. " "Escribes hilos de Twitter/X. Cada tweet debe tener menos de 280 caracteres. " "NUNCA repitas información de un tweet anterior. " "Cada tweet debe revelar algo NUEVO. Numéralos 1/N, 2/N..." ) PROMPTS = { OutputType.PODCAST: """\ Escribe un guion de podcast sobre: "{topic}" REGLAS — sigue estrictamente: - Escribe como PALABRA HABLADA: contracciones, ritmo natural, como si hablaras con un amigo - NO uses encabezados formales como "SEGMENTO 1:" — fluye de forma natural - Cada párrafo debe introducir un NUEVO hecho o ángulo — nunca repitas algo ya dicho - Si te encuentras repitiendo, para y salta al siguiente punto nuevo - Objetivo: 800-1200 palabras de contenido hablado real ESTRUCTURA (usa transiciones naturales, no encabezados): 1. Gancho: abre con el hecho más sorprendente o dramático 2. Contexto: ¿cómo llegamos aquí? 3. Las evidencias o eventos clave (elige los 3 más interesantes) 4. La controversia o debate sobre el tema 5. ¿Qué significa esto / qué pasó después? MATERIAL DE INVESTIGACIÓN: {context} Escribe el guion ahora (solo palabra hablada, sin acotaciones excepto [PAUSA] ocasional):""", OutputType.BLOG: """\ Escribe un artículo de blog sobre: "{topic}" REGLAS — sigue estrictamente: - Cada sección bajo un encabezado debe añadir información NUEVA no cubierta en otro lugar - NO resumas secciones anteriores al inicio de cada nueva sección - NO repitas hechos — si un hecho aparece una vez, no lo menciones de nuevo - Usa detalles concretos, números, nombres — evita generalidades vagas - Objetivo: 1000-1500 palabras ESTRUCTURA: # [Titular impactante] [Párrafo gancho — el hecho más sorprendente] ## Contexto [Contexto — qué, cuándo, quién — solo hechos no cubiertos en otro lugar] ## Hechos Clave [Los hallazgos más significativos — cada punto debe ser distinto] ## Análisis / Importancia [Qué significa esto — sin repetir la sección de Hechos Clave] ## Conclusión [Conclusión — no más de 2 oraciones resumiendo, luego una declaración prospectiva] MATERIAL DE INVESTIGACIÓN: {context} Escribe el artículo completo en markdown:""", OutputType.REPORT: """\ Escribe un informe de investigación sobre: "{topic}" REGLAS — sigue estrictamente: - Cada hallazgo numerado debe ser DISTINTO — sin contenido que se superponga - El Resumen Ejecutivo NO debe repetir los hallazgos literalmente — solo los 2-3 puntos más críticos - La calidad de las fuentes y contradicciones deben referenciar afirmaciones específicas, no declaraciones genéricas - Sé preciso y conciso — sin relleno ESTRUCTURA: 1. Resumen Ejecutivo (3-4 oraciones, solo puntos clave) 2. Hallazgos Clave (5-10 numerados, cada uno completamente distinto) 3. Análisis de Evidencia (lo que muestran las fuentes, con cualquier contradicción) 4. Cronología (si aplica — fechas/eventos específicos) 5. Conclusiones y Preguntas Abiertas MATERIAL DE INVESTIGACIÓN: {context} Escribe el informe completo en markdown:""", OutputType.THREAD: """\ Escribe un hilo de Twitter/X sobre: "{topic}" REGLAS — sigue estrictamente: - Cada tweet debe revelar UN nuevo hecho o idea — nunca repetir un tweet anterior - Máximo 280 caracteres por tweet (cuenta cuidadosamente) - Formato de numeración: 1/ 2/ 3/ ... N/ - El tweet gancho debe ser el hecho más sorprendente/provocador - Avanza hacia una conclusión — no repitas el gancho al final - 12-18 tweets en total MATERIAL DE INVESTIGACIÓN: {context} Escribe el hilo (un tweet por línea, nada más):""" } OUTLINE_REPORT = """ IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación. Eres un editor de investigación. Dado el tema "{topic}" y el material disponible, genera un outline detallado para un informe exhaustivo. Devuelve SOLO una lista JSON de secciones, sin texto adicional, sin markdown, sin explicaciones. Formato exacto: [ {{"title": "Título de la sección", "query": "términos de búsqueda específicos para esta sección", "words": 800}}, ... ] Genera entre 6 y 10 secciones. Cada sección debe: - Cubrir un ángulo distinto del tema - Tener una query específica para recuperar chunks relevantes - Indicar longitud objetivo en palabras (400-1200) Material disponible (resumen): {context_summary} """ OUTLINE_BLOG = """ IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación. Eres un editor de contenido. Dado el tema "{topic}" y el material disponible, genera un outline para un artículo de blog exhaustivo. Devuelve SOLO una lista JSON de secciones, sin texto adicional: [ {{"title": "Título de sección", "query": "términos búsqueda", "words": 600}}, ... ] Genera entre 5 y 8 secciones. Primera sección = introducción gancho. Última sección = conclusión con perspectiva original. Material disponible (resumen): {context_summary} """ OUTLINE_PODCAST = """ IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación. Eres un productor de podcast. Dado el tema "{topic}" y el material disponible, genera un outline para un guion de podcast exhaustivo. Devuelve SOLO una lista JSON de segmentos, sin texto adicional: [ {{"title": "Nombre del segmento", "query": "términos búsqueda", "words": 700}}, ... ] Genera entre 5 y 7 segmentos. Flujo natural: gancho → contexto → desarrollo → controversia → conclusión. Material disponible (resumen): {context_summary} """ # ─── Ghost CMS ──────────────────────────────────────────────────────────────── def _b64url(data: bytes | str) -> str: if isinstance(data, str): data = data.encode() return base64.urlsafe_b64encode(data).rstrip(b"=").decode() def _extract_title(content: str) -> str: """Return first H1 heading from markdown, skipping the ResearchOwl header block.""" in_header = False for line in content.splitlines(): stripped = line.strip() if stripped == "---": in_header = not in_header continue if in_header: continue if stripped.startswith("# ") and not stripped.startswith("## "): return stripped[2:].strip() return "" def _strip_researchowl_header(content: str) -> str: """Remove the ---...--- metadata block that ResearchOwl prepends to outputs.""" lines = content.splitlines(keepends=True) dashes_seen = 0 for i, line in enumerate(lines): if line.strip() == "---": dashes_seen += 1 if dashes_seen == 2: return "".join(lines[i + 1:]).lstrip("\n") return content class GhostPublisher: def __init__(self, lang: str = "es"): if lang == "en": self.url = (settings.ghost_url_en or "").rstrip("/") self.api_key = settings.ghost_api_key_en or "" else: self.url = (settings.ghost_url or "").rstrip("/") self.api_key = settings.ghost_api_key or "" def is_configured(self) -> bool: return bool(self.url and self.api_key) def _make_token(self) -> str: key_id, secret = self.api_key.split(":", 1) now = int(time.time()) header = _b64url(json.dumps({"alg": "HS256", "typ": "JWT", "kid": key_id})) payload = _b64url(json.dumps({"iat": now, "exp": now + 300, "aud": "/admin/"})) signing = f"{header}.{payload}" sig = _b64url( hmac.new(bytes.fromhex(secret), signing.encode(), hashlib.sha256).digest() ) return f"{signing}.{sig}" async def publish_draft(self, title: str, markdown_content: str, tags: list[str] | None = None) -> dict: import aiohttp as _aio import markdown as _md clean = _strip_researchowl_header(markdown_content) html = _md.markdown(clean, extensions=["extra"]) # Ghost añade el título automáticamente — eliminar el primer

para evitar duplicado html = re.sub(r"]*>.*?

", "", html, count=1, flags=re.DOTALL).lstrip() logger.info("Ghost publish_draft", html_length=len(html), html_preview=html[:200]) if not html.strip(): raise ValueError("Ghost: HTML vacío tras conversión markdown — contenido no enviado") # Ghost 5.x (Lexical editor): el campo "html" es solo de lectura en la API. # La forma fiable de enviar HTML arbitrario es via mobiledoc con HTML card, # que Ghost acepta en todas las versiones de v5 y renderiza sin conversión. mobiledoc = json.dumps({ "version": "0.3.1", "atoms": [], "cards": [["html", {"html": html}]], "markups": [], "sections": [[10, 0]], }) token = self._make_token() body = { "posts": [{ "title": title, "mobiledoc": mobiledoc, "status": "draft", "tags": [{"name": t} for t in (tags or ["investigacion"])], }] } async with _aio.ClientSession() as sess: async with sess.post( f"{self.url}/ghost/api/admin/posts/", json=body, headers={ "Authorization": f"Ghost {token}", "Accept-Version": "v5.0", }, ) as resp: if resp.status not in (200, 201): text = await resp.text() raise ValueError(f"Ghost API {resp.status}: {text[:300]}") return await resp.json() # ─── Output generation ──────────────────────────────────────────────────────── class OutputGenerator: def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor): self.db = db self.ollama = ollama self.processor = processor async def generate(self, session_id: int, output_type: OutputType, progress_callback=None, lang: str = "es") -> str: """Generate an output for a research session""" if output_type in (OutputType.REPORT_EXTENDED, OutputType.BLOG_EXTENDED, OutputType.PODCAST_EXTENDED): return await self.generate_extended(session_id, output_type, progress_callback, lang=lang) session = await self.db.get_session(session_id) if not session: raise ValueError(f"Session {session_id} not found") topic = session["topic"] logger.info("Generating output", type=output_type, topic=topic) if progress_callback: await progress_callback(f"🔍 Retrieving best research material for {output_type}...") # RAG: get most relevant context for this output type query = self._get_rag_query(output_type, topic) context = await self.processor.rag_query(session_id, query, top_k=80) if not context: # Fallback: use raw top chunks chunks = await self.db.get_top_chunks(session_id, limit=20) context = "\n\n---\n\n".join(c["content"] for c in chunks) if not context: raise ValueError("No processed content available. Run /process first.") backend = "Claude Haiku" if settings.anthropic_api_key else "Ollama" if progress_callback: await progress_callback(f"✍️ Generando {output_type} con {backend}... (2-5 min)") # Build prompt system = self._get_system(output_type) prompt = PROMPTS[output_type].format(topic=topic, context=context) if lang == "en" and output_type == OutputType.BLOG: system = BLOG_SYSTEM_EN prompt = BLOG_PROMPT_EN.format(topic=topic, context=context) output = await self._generate(prompt, system, output_type, session_id) # Add metadata header stats = await self.db.get_session_stats(session_id) header = self._build_header(topic, output_type, session, stats) full_output = header + "\n\n" + output # Save to DB await self.db.save_output(session_id, output_type, full_output) # Auto-publish to Ghost for blog outputs ghost_notice = "" if output_type in (OutputType.BLOG, OutputType.BLOG_EXTENDED): ghost = GhostPublisher(lang=lang) if ghost.is_configured(): try: title = _extract_title(full_output) or topic result = await ghost.publish_draft(title, full_output) post = result["posts"][0] ghost_notice = ( f"\n\n---\n" f"📤 *Borrador publicado en Ghost*\n" f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}" ) logger.info("Auto-published blog to Ghost", post_id=post["id"]) except Exception as e: logger.warning("Auto-publish to Ghost failed", error=str(e)) logger.info("Output generated", type=output_type, length=len(full_output)) return full_output + ghost_notice async def _generate(self, prompt: str, system: str, output_type: OutputType, session_id: int | None = None) -> str: if settings.anthropic_api_key: return await self._generate_with_claude(prompt, system, output_type, session_id) return await self._generate_with_ollama(prompt, system) async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType, session_id: int | None = None) -> str: import anthropic max_tokens = 4096 if output_type == OutputType.THREAD else 16000 try: client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) msg = await client.messages.create( model=settings.claude_model, max_tokens=max_tokens, system=system, messages=[{"role": "user", "content": prompt}], ) if session_id is not None: try: await self.db.log_api_call( session_id, "generation", settings.claude_model, msg.usage.input_tokens, msg.usage.output_tokens ) except Exception as log_err: logger.warning("Failed to log API usage", error=str(log_err)) return msg.content[0].text.strip() except Exception as e: logger.warning("Claude generation failed, falling back to Ollama", error=str(e)) return await self._generate_with_ollama(prompt, system) async def _generate_with_ollama(self, prompt: str, system: str) -> str: return await self.ollama.generate(prompt, system=system, timeout=300, temperature=0.7) def _get_rag_query(self, output_type: OutputType, topic: str) -> str: queries = { OutputType.PODCAST: f"{topic} story narrative facts interesting", OutputType.BLOG: f"{topic} key facts evidence analysis", OutputType.REPORT: f"{topic} evidence data official findings", OutputType.THREAD: f"{topic} surprising facts shocking revelations", } return queries.get(output_type, topic) def _get_system(self, output_type: OutputType) -> str: systems = { OutputType.PODCAST: PODCAST_SYSTEM, OutputType.BLOG: BLOG_SYSTEM, OutputType.REPORT: REPORT_SYSTEM, OutputType.THREAD: THREAD_SYSTEM, } return systems.get(output_type, "You are a helpful research assistant.") async def generate_extended(self, session_id: int, output_type: OutputType, progress_callback=None, lang: str = "es") -> str: """ Generación por secciones para outputs exhaustivos. 1. Recupera muestra de contexto para el outline 2. Genera outline con Claude (lista de secciones) 3. Para cada sección: RAG específico → genera sección 4. Concatena y guarda """ session = await self.db.get_session(session_id) if not session: raise ValueError(f"Session {session_id} not found") topic = session["topic"] # Paso 1: contexto resumen para el outline (top 10 chunks) top_chunks = await self.db.get_top_chunks(session_id, limit=10) if not top_chunks: raise ValueError("No processed content available. Run /process first.") context_summary = "\n\n".join( f"- {c.get('title', '')}: {c['content'][:300]}" for c in top_chunks ) if progress_callback: await progress_callback("🗂️ Generando estructura del documento…") # Paso 2: outline base_type = output_type.value.replace("_extended", "") outline_prompts = { "report": OUTLINE_REPORT, "blog": OUTLINE_BLOG, "podcast": OUTLINE_PODCAST, } outline_prompt = outline_prompts[base_type].format( topic=topic, context_summary=context_summary ) outline_json = await self._generate_raw(outline_prompt, session_id) try: import json as _json clean = outline_json.strip() if clean.startswith("```"): clean = "\n".join(clean.split("\n")[1:]) if clean.endswith("```"): clean = "\n".join(clean.split("\n")[:-1]) sections = _json.loads(clean.strip()) except Exception as e: logger.error("Failed to parse outline", error=str(e), raw=outline_json[:200]) raise ValueError(f"No se pudo generar el outline: {e}") if progress_callback: await progress_callback( f"✍️ Generando {len(sections)} secciones… (esto tardará varios minutos)" ) # Paso 3: generar cada sección base_output_type = OutputType(base_type) system = self._get_system(base_output_type) if lang == "en" and output_type == OutputType.BLOG_EXTENDED: system = BLOG_SYSTEM_EN sections_text = [] for i, section in enumerate(sections, 1): title = section.get("title", f"Sección {i}") query = section.get("query", topic) target_words = section.get("words", 600) if progress_callback: await progress_callback( f"✍️ Sección {i}/{len(sections)}: {title[:40]}…" ) section_context = await self.processor.rag_query(session_id, query, top_k=40) if not section_context: section_context = context_summary lang_rule = "- Write in English\n" if lang == "en" else "- Escribe en español\n" section_prompt = ( f"Escribe la sección '{title}' del {base_type} sobre: '{topic}'\n\n" f"REGLAS:\n" f"- NO incluyas ningún título o encabezado al inicio de tu respuesta — el título de la sección ya está incluido externamente\n" f"- Esta es UNA sección de un documento más largo — no repitas introducción ni conclusión general\n" f"- No incluyas encabezados del documento completo, solo el contenido de esta sección\n" f"- Objetivo: aproximadamente {target_words} palabras\n" f"- Usa SOLO información del material siguiente — no inventes datos\n" f"{lang_rule}" f"\nMATERIAL:\n{section_context}" ) section_text = await self._generate( section_prompt, system, base_output_type, session_id ) sections_text.append(f"## {title}\n\n{section_text}") # Paso 4: concatenar full_content = "\n\n---\n\n".join(sections_text) stats = await self.db.get_session_stats(session_id) header = self._build_header(topic, output_type, session, stats) full_output = header + "\n\n" + full_content await self.db.save_output(session_id, output_type, full_output) # Auto-publish to Ghost for extended blog outputs ghost_notice = "" if output_type == OutputType.BLOG_EXTENDED: ghost = GhostPublisher(lang=lang) if ghost.is_configured(): try: title = _extract_title(full_output) or topic result = await ghost.publish_draft(title, full_output) post = result["posts"][0] ghost_notice = ( f"\n\n---\n" f"📤 *Borrador publicado en Ghost*\n" f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}" ) logger.info("Auto-published extended blog to Ghost", post_id=post["id"]) except Exception as e: logger.warning("Auto-publish to Ghost failed (extended)", error=str(e)) logger.info("Extended output generated", type=output_type, sections=len(sections), length=len(full_output)) return full_output + ghost_notice async def _generate_raw(self, prompt: str, session_id: int | None = None) -> str: if settings.anthropic_api_key: import anthropic try: client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) msg = await client.messages.create( model=settings.claude_model, max_tokens=2048, messages=[{"role": "user", "content": prompt}], ) if session_id is not None: try: await self.db.log_api_call( session_id, "outline", settings.claude_model, msg.usage.input_tokens, msg.usage.output_tokens ) except Exception: pass return msg.content[0].text.strip() except Exception as e: logger.warning("Claude outline failed", error=str(e)) raise raise ValueError("Claude API key required for extended generation") def _build_header(self, topic: str, output_type: OutputType, session: dict, stats: dict) -> str: from datetime import datetime dt = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") return f"""--- ResearchOwl | {output_type.upper()} OUTPUT Topic: {topic} Generated: {dt} Sources: {stats.get('scraped', 0)} scraped | {stats.get('failed', 0)} failed Iterations: {session.get('iterations', 0)} Total words researched: {session.get('total_words', 0):,} --- """ def _remove_duplicate_headings(text: str) -> str: lines = text.split('\n') result = [] i = 0 while i < len(lines): line = lines[i].rstrip() if line.startswith('# ') and not line.startswith('## '): h1_text = line[2:].strip().lower() window = result[-5:] if len(result) >= 5 else result[:] recent_h2s = { p.strip()[3:].strip().lower() for p in window if p.strip().startswith('## ') } if h1_text in recent_h2s: i += 1 continue result.append(lines[i]) i += 1 return '\n'.join(result) def generate_pdf(content: str, title: str = "ResearchOwl Output") -> bytes: content = _remove_duplicate_headings(content) try: from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, HRFlowable from reportlab.lib.enums import TA_LEFT from reportlab.lib import colors import io import re except ImportError: raise ImportError("reportlab is required for PDF export — pip install reportlab") buf = io.BytesIO() doc = SimpleDocTemplate( buf, pagesize=A4, rightMargin=2 * cm, leftMargin=2 * cm, topMargin=2.5 * cm, bottomMargin=2 * cm, title=title, ) base = getSampleStyleSheet() normal = ParagraphStyle("RO_Normal", parent=base["Normal"], fontSize=10, leading=14, spaceAfter=4) h1 = ParagraphStyle("RO_H1", parent=base["Heading1"], fontSize=18, spaceBefore=12, spaceAfter=6, textColor=colors.HexColor("#1a1a2e")) h2 = ParagraphStyle("RO_H2", parent=base["Heading2"], fontSize=14, spaceBefore=10, spaceAfter=4, textColor=colors.HexColor("#16213e")) h3 = ParagraphStyle("RO_H3", parent=base["Heading3"], fontSize=12, spaceBefore=8, spaceAfter=4) code_style = ParagraphStyle("RO_Code", parent=base["Code"], fontSize=9, leading=12, fontName="Courier", backColor=colors.HexColor("#f4f4f4"), spaceAfter=4) bullet_style = ParagraphStyle("RO_Bullet", parent=normal, leftIndent=20, bulletIndent=10, spaceAfter=2) def md_to_para(text: str) -> str: text = text.replace("&", "&").replace("<", "<").replace(">", ">") text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) text = re.sub(r'\*(.+?)\*', r'\1', text) text = re.sub(r'_(.+?)_', r'\1', text) text = re.sub(r'`(.+?)`', r'\1', text) return text story = [] lines = content.split("\n") in_code = False code_buf = [] for line in lines: if line.startswith("```"): if not in_code: in_code = True code_buf = [] else: in_code = False try: story.append(Paragraph( "
".join(l.replace("&", "&").replace("<", "<").replace(">", ">") for l in code_buf), code_style )) except Exception: pass continue if in_code: code_buf.append(line) continue if re.match(r'^[-*_]{3,}$', line.strip()): story.append(HRFlowable(width="100%", thickness=0.5, color=colors.grey, spaceAfter=6)) continue try: if line.startswith("### "): story.append(Paragraph(md_to_para(line[4:]), h3)) elif line.startswith("## "): story.append(Paragraph(md_to_para(line[3:]), h2)) elif line.startswith("# "): story.append(Paragraph(md_to_para(line[2:]), h1)) elif re.match(r'^[-*+] ', line): story.append(Paragraph("• " + md_to_para(line[2:]), bullet_style)) elif re.match(r'^\d+\. ', line): story.append(Paragraph(md_to_para(line), bullet_style)) elif line.strip() == "": story.append(Spacer(1, 6)) else: story.append(Paragraph(md_to_para(line), normal)) except Exception: try: story.append(Paragraph(line[:300], normal)) except Exception: pass doc.build(story) return buf.getvalue() async def generate_diff_summary( topic: str, new_urls: set, old_urls: set, new_chunks: list, session_id: int, db, ) -> str | None: from src.config import settings import structlog diff_logger = structlog.get_logger() added_urls = new_urls - old_urls pct_new = len(added_urls) / max(len(new_urls), 1) diff_logger.info("Diff analysis", topic=topic, new_urls=len(new_urls), old_urls=len(old_urls), added=len(added_urls), pct_new=round(pct_new, 2)) if pct_new < 0.20 and len(added_urls) < 5: diff_logger.info("Diff: no significant new sources", topic=topic) return None if not new_chunks: return None context = "\n\n---\n\n".join( f"[{c.get('source_type', 'web').upper()}] {c.get('title', '')}\n{c['content'][:400]}" for c in new_chunks[:20] ) if not settings.anthropic_api_key: return ( f"📊 *Novedades detectadas sobre {topic}*\n\n" f"• {len(added_urls)} fuentes nuevas encontradas\n" f"• {len(new_chunks)} chunks de contenido procesados\n\n" f"Usa /generate report para ver el análisis completo." ) try: import anthropic client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) prompt = ( f'Analiza el siguiente material de investigación sobre "{topic}" ' f'y genera un resumen BREVE (máximo 300 palabras) de las novedades ' f'más importantes encontradas. Escribe en español.\n\n' f'Si el contenido es muy similar a investigaciones anteriores o no ' f'contiene información genuinamente nueva, responde SOLO con: ' f'"SIN_NOVEDADES"\n\n' f'Material nuevo:\n{context}' ) msg = await client.messages.create( model=settings.claude_model, max_tokens=500, messages=[{"role": "user", "content": prompt}] ) summary = msg.content[0].text.strip() if summary == "SIN_NOVEDADES": diff_logger.info("Diff: Claude found no new information", topic=topic) return None try: await db.log_api_call(session_id, "diff", settings.claude_model, msg.usage.input_tokens, msg.usage.output_tokens) except Exception: pass return f"🔔 *Novedades — {topic}*\n\n{summary}\n\nUsa /generate para report completo." except Exception as e: diff_logger.warning("Diff summary generation failed", error=str(e)) return ( f"📊 *Actualización — {topic}*\n\n" f"• {len(added_urls)} fuentes nuevas\n" f"Usa /generate report para ver el análisis completo." ) async def generate_comparison( topic_a: str, topic_b: str, context_a: str, context_b: str, session_id_a: int, db, ) -> str: from src.config import settings import structlog cmp_logger = structlog.get_logger() if not settings.anthropic_api_key: raise ValueError("Claude API key required for comparison") def _truncate(text: str, max_words: int = 3000) -> str: words = text.split() if len(words) > max_words: return " ".join(words[:max_words]) + "\n\n[... contenido adicional truncado ...]" return text context_a = _truncate(context_a) context_b = _truncate(context_b) prompt = ( f'Eres un analista experto. Compara en profundidad estos dos temas:\n' f'TEMA A: "{topic_a}"\n' f'TEMA B: "{topic_b}"\n\n' f'Escribe el análisis en español con esta estructura:\n\n' f'## Resumen comparativo\n' f'(2-3 párrafos con las diferencias y similitudes más importantes)\n\n' f'## {topic_a}\n' f'(Puntos clave únicos de este tema)\n\n' f'## {topic_b}\n' f'(Puntos clave únicos de este tema)\n\n' f'## Similitudes\n' f'(Qué tienen en común)\n\n' f'## Diferencias clave\n' f'(Tabla markdown o lista de las diferencias más relevantes)\n\n' f'## Conclusión\n' f'(Cuál es mejor/más relevante según el contexto, o qué conclusión se extrae)\n\n' f'---\n' f'MATERIAL DE INVESTIGACIÓN — {topic_a}:\n{context_a}\n\n' f'---\n' f'MATERIAL DE INVESTIGACIÓN — {topic_b}:\n{context_b}\n' ) try: import anthropic client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) msg = await client.messages.create( model=settings.claude_model, max_tokens=8192, messages=[{"role": "user", "content": prompt}] ) try: await db.log_api_call( session_id_a, "comparison", settings.claude_model, msg.usage.input_tokens, msg.usage.output_tokens ) except Exception: pass return msg.content[0].text.strip() except Exception as e: cmp_logger.error("Comparison generation failed", error=str(e)) raise