Files
researchowl/src/generator/generator.py
T
ChemaVX b33ae202b8
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
feat: trackeo de coste por llamada Claude — tabla api_usage + /costs
2026-05-03 20:06:06 +00:00

260 lines
10 KiB
Python

"""
ResearchOwl Generators
Produces structured outputs from processed research using Claude or Ollama
"""
import structlog
from src.config import settings
from src.processor.processor import OllamaClient, ContentProcessor
from src.db.database import ResearchDB, OutputType
logger = structlog.get_logger()
PODCAST_SYSTEM = (
"Escribe SIEMPRE en español. "
"Eres un guionista de podcast. Escribe exactamente como un presentador HABLA — contracciones, "
"frases naturales, pausas, preguntas retóricas. "
"NUNCA repitas una frase o idea que ya escribiste. "
"Cada párrafo debe introducir información NUEVA. "
"Usa marcadores [PAUSA], [ÉNFASIS], [MÚSICA] con moderación."
)
BLOG_SYSTEM = (
"Escribe SIEMPRE en español. "
"Eres un periodista escribiendo un artículo de blog. Usa encabezados markdown claros. "
"NUNCA repitas el mismo dato o frase dos veces — si ya lo dijiste, avanza. "
"Cada sección debe añadir información nueva no cubierta en secciones anteriores."
)
REPORT_SYSTEM = (
"Escribe SIEMPRE en español. "
"Eres un analista de investigación. Escribe un informe estructurado y factual. "
"Sé conciso — NO rellenes con resúmenes redundantes. "
"NUNCA repitas un hallazgo ya listado. Cada hallazgo numerado debe ser distinto."
)
THREAD_SYSTEM = (
"Escribe SIEMPRE en español. "
"Escribes hilos de Twitter/X. Cada tweet debe tener menos de 280 caracteres. "
"NUNCA repitas información de un tweet anterior. "
"Cada tweet debe revelar algo NUEVO. Numéralos 1/N, 2/N..."
)
PROMPTS = {
OutputType.PODCAST: """\
Escribe un guion de podcast sobre: "{topic}"
REGLAS — sigue estrictamente:
- Escribe como PALABRA HABLADA: contracciones, ritmo natural, como si hablaras con un amigo
- NO uses encabezados formales como "SEGMENTO 1:" — fluye de forma natural
- Cada párrafo debe introducir un NUEVO hecho o ángulo — nunca repitas algo ya dicho
- Si te encuentras repitiendo, para y salta al siguiente punto nuevo
- Objetivo: 800-1200 palabras de contenido hablado real
ESTRUCTURA (usa transiciones naturales, no encabezados):
1. Gancho: abre con el hecho más sorprendente o dramático
2. Contexto: ¿cómo llegamos aquí?
3. Las evidencias o eventos clave (elige los 3 más interesantes)
4. La controversia o debate sobre el tema
5. ¿Qué significa esto / qué pasó después?
MATERIAL DE INVESTIGACIÓN:
{context}
Escribe el guion ahora (solo palabra hablada, sin acotaciones excepto [PAUSA] ocasional):""",
OutputType.BLOG: """\
Escribe un artículo de blog sobre: "{topic}"
REGLAS — sigue estrictamente:
- Cada sección bajo un encabezado debe añadir información NUEVA no cubierta en otro lugar
- NO resumas secciones anteriores al inicio de cada nueva sección
- NO repitas hechos — si un hecho aparece una vez, no lo menciones de nuevo
- Usa detalles concretos, números, nombres — evita generalidades vagas
- Objetivo: 1000-1500 palabras
ESTRUCTURA:
# [Titular impactante]
[Párrafo gancho — el hecho más sorprendente]
## Contexto
[Contexto — qué, cuándo, quién — solo hechos no cubiertos en otro lugar]
## Hechos Clave
[Los hallazgos más significativos — cada punto debe ser distinto]
## Análisis / Importancia
[Qué significa esto — sin repetir la sección de Hechos Clave]
## Conclusión
[Conclusión — no más de 2 oraciones resumiendo, luego una declaración prospectiva]
MATERIAL DE INVESTIGACIÓN:
{context}
Escribe el artículo completo en markdown:""",
OutputType.REPORT: """\
Escribe un informe de investigación sobre: "{topic}"
REGLAS — sigue estrictamente:
- Cada hallazgo numerado debe ser DISTINTO — sin contenido que se superponga
- El Resumen Ejecutivo NO debe repetir los hallazgos literalmente — solo los 2-3 puntos más críticos
- La calidad de las fuentes y contradicciones deben referenciar afirmaciones específicas, no declaraciones genéricas
- Sé preciso y conciso — sin relleno
ESTRUCTURA:
1. Resumen Ejecutivo (3-4 oraciones, solo puntos clave)
2. Hallazgos Clave (5-10 numerados, cada uno completamente distinto)
3. Análisis de Evidencia (lo que muestran las fuentes, con cualquier contradicción)
4. Cronología (si aplica — fechas/eventos específicos)
5. Conclusiones y Preguntas Abiertas
MATERIAL DE INVESTIGACIÓN:
{context}
Escribe el informe completo en markdown:""",
OutputType.THREAD: """\
Escribe un hilo de Twitter/X sobre: "{topic}"
REGLAS — sigue estrictamente:
- Cada tweet debe revelar UN nuevo hecho o idea — nunca repetir un tweet anterior
- Máximo 280 caracteres por tweet (cuenta cuidadosamente)
- Formato de numeración: 1/ 2/ 3/ ... N/
- El tweet gancho debe ser el hecho más sorprendente/provocador
- Avanza hacia una conclusión — no repitas el gancho al final
- 12-18 tweets en total
MATERIAL DE INVESTIGACIÓN:
{context}
Escribe el hilo (un tweet por línea, nada más):"""
}
class OutputGenerator:
def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor):
self.db = db
self.ollama = ollama
self.processor = processor
async def generate(self, session_id: int, output_type: OutputType,
progress_callback=None) -> str:
"""Generate an output for a research session"""
session = await self.db.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
topic = session["topic"]
logger.info("Generating output", type=output_type, topic=topic)
if progress_callback:
await progress_callback(f"🔍 Retrieving best research material for {output_type}...")
# RAG: get most relevant context for this output type
query = self._get_rag_query(output_type, topic)
context = await self.processor.rag_query(session_id, query, top_k=30)
if not context:
# Fallback: use raw top chunks
chunks = await self.db.get_top_chunks(session_id, limit=20)
context = "\n\n---\n\n".join(c["content"] for c in chunks)
if not context:
raise ValueError("No processed content available. Run /process first.")
# Truncate context to avoid Ollama context limits
context_words = context.split()
if len(context_words) > 6000:
context = " ".join(context_words[:6000]) + "\n\n[... additional material truncated ...]"
backend = "Claude Haiku" if settings.anthropic_api_key else "Ollama"
if progress_callback:
await progress_callback(f"✍️ Generando {output_type} con {backend}... (2-5 min)")
# Build prompt
system = self._get_system(output_type)
prompt = PROMPTS[output_type].format(topic=topic, context=context)
output = await self._generate(prompt, system, output_type, session_id)
# Add metadata header
stats = await self.db.get_session_stats(session_id)
header = self._build_header(topic, output_type, session, stats)
full_output = header + "\n\n" + output
# Save to DB
await self.db.save_output(session_id, output_type, full_output)
logger.info("Output generated", type=output_type, length=len(full_output))
return full_output
async def _generate(self, prompt: str, system: str, output_type: OutputType,
session_id: int | None = None) -> str:
if settings.anthropic_api_key:
return await self._generate_with_claude(prompt, system, output_type, session_id)
return await self._generate_with_ollama(prompt, system)
async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType,
session_id: int | None = None) -> str:
import anthropic
max_tokens = 4096 if output_type == OutputType.THREAD else 8192
try:
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
msg = await client.messages.create(
model=settings.claude_model,
max_tokens=max_tokens,
system=system,
messages=[{"role": "user", "content": prompt}],
)
if session_id is not None:
try:
await self.db.log_api_call(
session_id, "generation", settings.claude_model,
msg.usage.input_tokens, msg.usage.output_tokens
)
except Exception as log_err:
logger.warning("Failed to log API usage", error=str(log_err))
return msg.content[0].text.strip()
except Exception as e:
logger.warning("Claude generation failed, falling back to Ollama", error=str(e))
return await self._generate_with_ollama(prompt, system)
async def _generate_with_ollama(self, prompt: str, system: str) -> str:
return await self.ollama.generate(prompt, system=system, timeout=300, temperature=0.7)
def _get_rag_query(self, output_type: OutputType, topic: str) -> str:
queries = {
OutputType.PODCAST: f"{topic} story narrative facts interesting",
OutputType.BLOG: f"{topic} key facts evidence analysis",
OutputType.REPORT: f"{topic} evidence data official findings",
OutputType.THREAD: f"{topic} surprising facts shocking revelations",
}
return queries.get(output_type, topic)
def _get_system(self, output_type: OutputType) -> str:
systems = {
OutputType.PODCAST: PODCAST_SYSTEM,
OutputType.BLOG: BLOG_SYSTEM,
OutputType.REPORT: REPORT_SYSTEM,
OutputType.THREAD: THREAD_SYSTEM,
}
return systems.get(output_type, "You are a helpful research assistant.")
def _build_header(self, topic: str, output_type: OutputType,
session: dict, stats: dict) -> str:
from datetime import datetime
dt = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
return f"""---
ResearchOwl | {output_type.upper()} OUTPUT
Topic: {topic}
Generated: {dt}
Sources: {stats.get('scraped', 0)} scraped | {stats.get('failed', 0)} failed
Iterations: {session.get('iterations', 0)}
Total words researched: {session.get('total_words', 0):,}
---
"""