Observabilidad — Guía técnica
Arquitectura completa del pipeline de observabilidad: SDK de la API → Kafka → Logstash → Elasticsearch → Analytics Dashboard.
Arquitectura general
Componentes y puertos
| Servicio | Puerto externo | Puerto interno | URL pública |
|---|---|---|---|
| Kafka Broker | 9092 | 29092 | — |
| Kafka UI | 8280 | 8080 | https://kafka.zentto.net |
| Elasticsearch | 9200 | 9200 | — |
| Kibana | 5601 | 5601 | https://kibana.zentto.net |
| Logstash | — | — | — |
| APM Server | 8200 | 8200 | — |
Kafka — Topics y formato de mensajes
La API se conecta a Kafka usando la IP del gateway Docker (172.18.0.1:9092).
El listener PLAINTEXT_HOST del broker anuncia esta misma IP para que la conexión funcione
tanto desde la API (contenedor Docker) como desde el host.
Regla crítica: El campo "topic" dentro del JSON del mensaje debe coincidir
exactamente con el nombre del índice que Logstash creará en Elasticsearch. Logstash usa
%{[topic]}-%{+YYYY.MM.dd} para nombrar los índices.
Topics activos
| Topic Kafka | Índice ES resultante | Retención | Generado por |
|---|---|---|---|
| zentto-api-logs | zentto-api-logs-YYYY.MM.dd | 30 días (ILM) | middleware automático |
| zentto-api-errors | zentto-api-errors-YYYY.MM.dd | 90 días | obs.error() |
| zentto-api-audit | zentto-api-audit-YYYY.MM.dd | 365 días | obs.audit() |
| zentto-api-performance | zentto-api-performance-YYYY.MM.dd | 30 días | middleware / obs.perf() |
| zentto-api-events | zentto-api-events-YYYY.MM.dd | 90 días | obs.event() |
| zentto-docker-logs | zentto-docker-logs-YYYY.MM.dd | 14 días | Filebeat |
Formato del mensaje: zentto-api-logs
{
"@timestamp": "2026-03-22T15:30:00.000Z", // ISO 8601 UTC
"timestamp": "2026-03-22T15:30:00.000Z", // duplicado para compatibilidad
"topic": "zentto-api-logs", // DEBE coincidir con el topic Kafka
"service": "zentto-api",
"log_level": "info",
"type": "http",
"method": "POST",
"path": "/v1/facturas",
"statusCode": 201,
"durationMs": 120,
"userId": "10", // string (sub del JWT)
"companyId": 1, // integer
"ip": "203.0.113.5",
"userAgent": "Mozilla/5.0 ..."
} Formato del mensaje: zentto-api-events
{
"@timestamp": "2026-03-22T15:30:00.000Z",
"timestamp": "2026-03-22T15:30:00.000Z",
"topic": "zentto-api-events",
"service": "zentto-api",
"log_level": "info",
"event": "invoice.created", // nombre del evento de negocio
"userId": "10",
"companyId": 1,
"path": "/v1/facturas" // ruta que originó el evento
} Formato del mensaje: zentto-api-audit
{
"@timestamp": "2026-03-22T15:30:00.000Z",
"timestamp": "2026-03-22T15:30:00.000Z",
"topic": "zentto-api-audit",
"service": "zentto-api",
"log_level": "audit",
"action": "UPDATE_PRICE", // SCREAMING_SNAKE_CASE
"userId": "10",
"userName": "vendedor1",
"companyId": 1,
"module": "inventario",
"entity": "Articulo",
"entityId": "4523",
"ip": "203.0.113.5",
"before": { "precio": 10.00 }, // estado anterior (opcional)
"after": { "precio": 15.00 } // estado nuevo (opcional)
} Formato del mensaje: zentto-api-performance
{
"@timestamp": "2026-03-22T15:30:00.000Z",
"timestamp": "2026-03-22T15:30:00.000Z",
"topic": "zentto-api-performance",
"service": "zentto-api",
"log_level": "perf",
"type": "slow-request",
"path": "/v1/contabilidad/reportes/balance-general",
"method": "GET",
"statusCode": 200,
"durationMs": 1850,
"companyId": 1
} Logstash — Pipeline
El pipeline de Logstash está en el servidor en
/opt/zentto-observability/logstash/pipeline/zentto.conf.
Regla de naming de índices
Logstash nombra el índice usando el campo topic del mensaje JSON:
# zentto.conf — output section
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[topic]}-%{+YYYY.MM.dd}" // usa el campo "topic" del JSON
}
} ⚠️ Problema frecuente: Si el campo topic ya contiene el prefijo
del índice (p.ej. "topic": "zentto-api-logs") y el template del output usa
"zentto-%{[topic]}", se crea un índice con doble prefijo:
zentto-zentto-api-logs-2026.03.22. Verificar siempre que el output sea
%{[topic]}-%{+YYYY.MM.dd} sin prefijo adicional.
Elasticsearch — Index Templates
Los index templates definen el mapping de campos para los índices nuevos.
Sin templates, Elasticsearch aplica dynamic mapping que puede crear conflictos de tipos
(p.ej. userId como long en vez de keyword).
Script de setup: zentto-infra/observability/elasticsearch/setup-index-templates.sh
# Aplicar todos los templates (desde el servidor o con ES expuesto)
bash setup-index-templates.sh http://localhost:9200 Templates definidos
| Template | Patrón de índice | Propósito |
|---|---|---|
| zentto-api-logs | zentto-api-logs-* | HTTP request logs |
| zentto-api-errors | zentto-api-errors-* | Errores con stack trace |
| zentto-api-audit | zentto-api-audit-* | Auditoría (quién hizo qué) |
| zentto-api-performance | zentto-api-performance-* | Endpoints lentos |
| zentto-api-events | zentto-api-events-* | Eventos de negocio |
Mapping de campos clave (zentto-api-logs)
| Campo | Tipo ES | Nota |
|---|---|---|
| @timestamp | date | Campo principal de tiempo |
| method | keyword | GET, POST, PUT, DELETE |
| path | keyword | Ruta exacta del endpoint |
| statusCode | integer | Código HTTP |
| durationMs | long | Duración en milisegundos |
| userId | keyword | Sub del JWT (string) |
| companyId | integer | ID de empresa |
| ip | ip | ignore_malformed: true |
| service | keyword | zentto-api |
| topic | keyword | Nombre del topic Kafka |
⚠️ Gotcha: tipos keyword vs .keyword
Cuando un campo está mapeado como keyword (tipo puro), no existe el
sub-campo .keyword. El sub-campo .keyword solo existe cuando el campo es
de tipo text con multi-field mapping.
En queries de aggregation, usar siempre el nombre directo:
// ✅ Correcto — campo keyword puro
{ "terms": { "field": "path" } }
{ "terms": { "field": "event" } }
{ "terms": { "field": "action" } }
// ❌ Incorrecto — .keyword no existe en campos keyword puros
{ "terms": { "field": "path.keyword" } } ILM Policy — Retención automática
El template zentto-api-logs usa la policy zentto-logs-policy:
- hot phase: rollover cada 1 día o 5 GB
- delete phase: eliminar después de 30 días
Verificar templates activos
# Listar todos los templates zentto-*
curl http://localhost:9200/_index_template?pretty | grep zentto
# Ver template específico
curl http://localhost:9200/_index_template/zentto-api-logs?pretty
# Ver índices activos y cantidad de docs
curl http://localhost:9200/_cat/indices/zentto-*?v&s=index
# Ver ILM policy
curl http://localhost:9200/_ilm/policy/zentto-logs-policy?pretty SDK de observabilidad
El SDK está en web/api/src/modules/integrations/observability.ts y exporta el objeto obs.
Importar
import { obs } from '../modules/integrations/observability.js'; obs.log() — Logs informativos
obs.log('info', 'Factura creada', { facturaId: 123, companyId: 1 });
obs.log('warn', 'Stock bajo para artículo', { articuloId: 456, stock: 2 });
obs.log('debug', 'Query ejecutada', { sp: 'usp_doc_Factura_List', durationMs: 45 }); obs.error() — Errores con stack trace
try {
await someOperation();
} catch (err) {
obs.error(err, { module: 'ventas', operation: 'createInvoice', userId: 5 });
throw err;
} obs.audit() — Auditoría
obs.audit('UPDATE_PRICE', {
userId: req.user.sub, // usar sub del JWT, no userId
userName: req.user.userName,
companyId: req.user.companyId,
module: 'inventario',
entity: 'Articulo',
entityId: articuloId,
before: { precio: 10.00 },
after: { precio: 15.00 },
ip: req.ip,
}); obs.perf() — Performance de operaciones
const start = Date.now();
const result = await callSp('usp_doc_Factura_List', params);
obs.perf('sp.usp_doc_Factura_List', Date.now() - start, {
companyId,
rowCount: result.recordset.length,
}); obs.event() — Eventos de negocio
obs.event('invoice.created', { companyId, invoiceId, total: 1500.00, currency: 'USD' });
obs.event('lead.won', { companyId, leadId, value: 25000, pipeline: 'CORP' });
obs.event('payroll.processed', { companyId, employeeCount: 45, totalAmount: 85000 }); Middleware automático
El middleware en middleware/observability.ts está registrado en app.ts
después de morgan y cubre automáticamente todos los endpoints:
- Cada HTTP request →
zentto-api-logs(método, ruta, status, duración, userId, companyId, IP) - Requests lentos (>1s) →
zentto-api-performance - Errores 5xx →
zentto-api-errors - POST exitosos en rutas conocidas →
zentto-api-events
Nota sobre userId: El middleware extrae el userId del campo sub del JWT
(no de userId). El valor se almacena como string en Elasticsearch para coincidir con el
tipo keyword del index template.
Eventos detectados automáticamente
| Ruta (startsWith) | Evento generado |
|---|---|
| /v1/auth/login | user.login |
| /v1/facturas | invoice.created |
| /v1/compras | purchase.created |
| /v1/clientes | customer.created |
| /v1/proveedores | vendor.created |
| /v1/pagos | payment.created |
| /v1/nomina/procesar | payroll.processed |
| /v1/pos | pos.sale |
| /v1/restaurante | restaurant.order |
| /v1/crm/leads | crm.lead.created |
| /v1/inventario | inventory.movement |
| /v1/contabilidad/asientos | accounting.entry.created |
| /v1/support/ticket | support.ticket.created |
Analytics API — Endpoints
Todos los endpoints están en web/api/src/modules/integrations/analytics.routes.ts
y se montan bajo /v1/analytics. Requieren autenticación JWT.
GET /v1/analytics/dashboard
KPIs principales: requests totales, usuarios únicos, latencia promedio, tasa de error.
| Query param | Valores | Default |
|---|---|---|
| range | 1h | 24h | 7d | 30d | 90d | 24h |
Índices consultados: zentto-api-logs-*, zentto-api-events-*
// Respuesta
{
"ok": true,
"range": "7d",
"kpis": {
"totalRequests": 1243,
"uniqueUsers": 5,
"avgLatencyMs": 87,
"errorCount": 3,
"errorRate": "0.24"
},
"charts": {
"requestsOverTime": [ { "date": "2026-03-15T00:00:00.000Z", "count": 145 } ],
"topEndpoints": [ { "path": "/v1/facturas", "count": 312 } ],
"statusCodes": [ { "code": 200, "count": 1200 }, { "code": 201, "count": 40 } ],
"eventsByType": [ { "event": "invoice.created", "count": 89 } ]
}
} GET /v1/analytics/activity
Actividad de usuarios: módulos más usados, heatmap por hora.
| Query param | Valores | Default |
|---|---|---|
| range | Cualquier valor ES (7d, 30d, etc.) | 7d |
Índices consultados: zentto-api-logs-*
Implementación: Usa dos queries paralelas (Promise.allSettled).
La primera obtiene by_module y activity_heatmap.
La segunda obtiene by_user (puede fallar en entornos con índices pre-template
que tienen userId como tipo conflictivo — falla de forma aislada sin afectar los demás datos).
// Respuesta
{
"ok": true,
"moduleUsage": [ { "path": "/v1/facturas", "count": 312 } ],
"heatmap": [ { "hour": "2026-03-22T10:00:00.000Z", "requests": 45, "users": 3 } ],
"users": [ { "userId": "10", "requestCount": 89, "lastSeen": "...", "topModules": ["/v1/facturas"] } ]
} GET /v1/analytics/business
Eventos de negocio: facturas, compras, pagos, ventas POS, leads CRM.
| Query param | Valores | Default |
|---|---|---|
| range | Cualquier valor ES | 30d |
Índices consultados: zentto-api-events-*
// Respuesta
{
"ok": true,
"summary": {
"invoices": 89, "purchases": 34, "payments": 120,
"newCustomers": 12, "posSales": 205, "leadsCreated": 18
},
"invoicesTrend": [ { "date": "2026-03-15T00:00:00.000Z", "count": 8 } ],
"allEvents": [ { "event": "pos.sale", "total": 205, "trend": [...] } ]
} GET /v1/analytics/performance
Rendimiento de endpoints: percentiles de latencia, endpoints más lentos, errores por ruta.
| Query param | Valores | Default |
|---|---|---|
| range | 24h | 7d | 30d | 24h |
Índices consultados: zentto-api-logs-*, zentto-api-performance-*
// Respuesta
{
"ok": true,
"percentiles": { "p50": 45, "p75": 89, "p90": 320, "p95": 890, "p99": 2100 },
"latencyTrend": [ { "date": "...", "avgMs": 87, "p95Ms": 890 } ],
"slowestEndpoints": [
{ "path": "/v1/contabilidad/reportes/balance-general", "avgMs": 1450, "maxMs": 3200, "count": 45 }
],
"errorsByPath": [ { "path": "/v1/facturas", "count": 3 } ]
} GET /v1/analytics/audit
Trail de auditoría paginado con filtros.
| Query param | Descripción | Default |
|---|---|---|
| range | Rango de tiempo ES | 7d |
| action | Filtrar por acción (UPDATE_PRICE, LOGIN, etc.) | — |
| userId | Filtrar por usuario | — |
| module | Filtrar por módulo | — |
| page | Página (1-based) | 1 |
| limit | Registros por página | 50 |
Índices consultados: zentto-api-audit-*
GET /v1/analytics/debug (solo admins)
Diagnóstico del pipeline: estado del cluster ES, índices activos y muestra de documentos.
// Respuesta
{
"ok": true,
"esHost": "http://172.18.0.1:9200",
"clusterHealth": { "status": "green", ... },
"indices": [ { "index": "zentto-api-logs-2026.03.22", "docs.count": "143", "status": "open" } ],
"sampleDocs": { "zentto-api-logs-2026.03.22": { "@timestamp": "...", "path": "/v1/facturas", ... } }
} Configuración
Variables de entorno en web/api/.env:
KAFKA_ENABLED=true
KAFKA_BROKERS=172.18.0.1:9092 # gateway Docker → host Kafka
SERVICE_NAME=zentto-api
ELASTICSEARCH_HOST=http://172.18.0.1:9200 # para analytics.routes.ts Si KAFKA_ENABLED no está definido o es false, el SDK hace fallback a console.log.
Esto permite desarrollo local sin Kafka.
Seed data para demo / desarrollo
Para poblar los dashboards con datos de prueba usar el script
zentto-infra/observability/elasticsearch/seed-demo-data.sh:
# Genera 30 días de datos demo para companyId=1
bash seed-demo-data.sh http://localhost:9200 1 30
# Argumentos: [ES_HOST] [COMPANY_ID] [DAYS]
# Defaults: http://localhost:9200 1 30 El script genera documentos en los 4 índices principales:
- zentto-api-logs-*: ~20-50 requests/día, horario laboral 8am-8pm
- zentto-api-events-*: ~8-20 eventos de negocio/día
- zentto-api-audit-*: ~5-13 registros de auditoría/día
- zentto-api-performance-*: ~3-7 métricas de endpoints lentos/día
Operación y troubleshooting
Verificar flujo completo
# 1. Verificar que Kafka recibe mensajes (Kafka UI)
https://kafka.zentto.net
# 2. Verificar índices en Elasticsearch
curl http://localhost:9200/_cat/indices/zentto-*?v&s=index
# 3. Verificar un documento reciente
curl http://localhost:9200/zentto-api-logs-*/_search?size=1&sort=@timestamp:desc
# 4. Usar endpoint de debug (requiere token admin)
GET /v1/analytics/debug
Authorization: Bearer <token> Problemas comunes
| Síntoma | Causa probable | Solución |
|---|---|---|
| Dashboard muestra ceros, API responde 200 | Kafka no llega a ES / índices sin datos | Verificar indices con _cat/indices. Revisar logs de Logstash. |
Índices con doble prefijo zentto-zentto-* | Logstash pipeline usa "zentto-%{[topic]}" | Corregir output a "%{[topic]}-%{+YYYY.MM.dd}" y eliminar índices incorrectos. |
Aggregation falla con illegal_argument_exception | userId tiene tipos conflictivos (keyword vs long) en índices pre-template | Esperar 30 días a que ILM elimine índices viejos, o eliminarlos manualmente. |
Aggregation retorna vacío con campo .keyword | Campo es keyword puro — no existe sub-campo .keyword | Usar el nombre directo del campo sin .keyword. |
| userId siempre null en logs | Middleware usaba user?.userId — el JWT usa sub | Usar user?.sub para obtener el id de usuario del JWT. |
| API no llega a Kafka (dentro de Docker) | KAFKA_ADVERTISED_LISTENERS usa localhost | Cambiar a PLAINTEXT_HOST://172.18.0.1:9092 en docker-compose del stack de observabilidad. |
Eliminar índices conflictivos
# Eliminar todos los índices de un día específico
curl -X DELETE http://localhost:9200/zentto-api-logs-2026.03.10
# Eliminar índices con doble prefijo (si ocurrió el problema)
curl -X DELETE "http://localhost:9200/zentto-zentto-*" Checklist para nuevos módulos
- El middleware HTTP cubre las rutas automáticamente (no se necesita nada)
- Agregar el evento a
eventMapenmiddleware/observability.tssi el módulo genera eventos de negocio en POST - Usar
obs.audit()para operaciones sensibles (cambio de precios, permisos, eliminaciones) - Usar
obs.perf()para operaciones potencialmente lentas (reportes, queries complejas) - Usar
obs.error()para errores críticos que requieran visibilidad en dashboard - Verificar en el debug endpoint que los documentos lleguen correctamente a ES